# Historical Sync Architecture
Backfill integration data (emails, calendar events, messages, etc.) into dual memory for AI agent context: structured graph (Postgres GraphNode/GraphEdge) and semantic vector (LanceDB with hybrid keyword+embedding search). Supports 3+ months of historical data ingestion with automatic worker scaling.
## Architecture Overview
┌──────────────┐ Redis Queue ┌──────────────┐
│ Web Machine │ ── enqueue ──────→ │ Worker VM #1 │ ── fetch ──→ Microsoft Graph
│ (Next.js + │ │ (2GB, 1xCPU) │ Google APIs
│ FastAPI) │ ←── poll results ── │ │ Slack, etc.
└──────────────┘ └──────────────┘
│ │
│ Fly Machines API │ Fly Machines API
│ (start/stop/create) │ (self-report idle)
▼ ▼
┌──────────────────────────────────────────────────┐
│ Fly.io Infrastructure │
│ auto_stop_machines: true │
│ auto_start_machines: true │
│ Worker VMs: 1–3, 2GB each │
└──────────────────────────────────────────────────┘
## Dual-Memory Architecture
Each ingested record produces two parallel memory stores:
| Memory Type | Storage | What | Used For |
|-------------|---------|------|----------|
| Structured | Postgres GraphNode/GraphEdge | Entities + relationships via LLM + rule-based extraction | Knowledge graph queries |
| Semantic | LanceDB | Raw text + vector embeddings | Hybrid keyword + semantic similarity search |
### Per-Chunk Ingestion Flow (100 records)
For each record in chunk:
|
+---> _extract_structured_entities() --> Postgres DiscoveredEntity
| * Rule-based from metadata (from, to, subject)
| * Linked to EntityTypeDefinition during schema discovery
| * Promoted to GraphNode on entity type activation
|
+---> graphrag.ingest_document() --> Postgres GraphNode/GraphEdge
| * LLM extraction via tenant BYOK key
| * Entities: people, orgs, topics
| * Idempotent (content hash dedup)
|
+---> Collect text in chunk_texts[]
After chunk (batched):
|
+---> asyncio.gather( --> LanceDB (semantic memory)
| lancedb.add_document() x 100) * 100 concurrent embedding API calls
| * ~2s total vs 20s sequential
| * Raw text + vector embeddings
| * Direct store, no transfer pipeline
### Why Batched?
Per-record lancedb.add_document() calls the OpenAI embedding API synchronously.
100 records × 200ms = 20s of blocking calls per chunk, triggering the 30-minute reaper.
asyncio.gather() runs all 100 calls concurrently, completing in ~2s.
### Memory Lifecycle
1. **Semantic memory**: LanceDB — raw text + embeddings stored directly during ingestion. Hybrid keyword + vector search. No transfer pipeline.
2. **Structured memory**: Postgres — GraphNode/GraphEdge for knowledge graph queries, DiscoveredEntity for pre-promotion staging. Permanent.
## Components
### 1. Job Lifecycle (core/historical_sync_service.py)
**States:** pending → running → completed / failed / paused / cancelled
- start_historical_sync() — validates plan tier, ACU quota, enqueues to Redis
- _process_sync_job() — background task: fetches records, extracts entities, persists to GraphRAG
- cancel_sync() — marks job cancelled; processing loop checks this on each chunk
- resume_sync() — re-enqueues to Redis worker queue from checkpoint
**Chunk processing (per chunk):**
1. Memory check (worker VMs only)
2. fetch_paginated_records() from integration
3. Entity/relationship extraction via _extract_structured_entities()
4. Persist to GraphRAG via ingest_structured_data()
5. Schema discovery for new entity types
6. Checkpoint save, heartbeat update, progress broadcast
### 2. Worker Queue (core/sync_job_queue.py)
Redis-backed priority queue with:
- **Sorted set** for priority + FIFO ordering
- **Job locking** (SETNX with 5-minute TTL) — prevents duplicate processing
- **Dead-letter queue** for failed jobs with retry support
- **Queue metrics** (depth, idle time) for autoscaling decisions
### 3. Worker Process (workers/sync_worker.py)
- Polls Redis every 1 second via dequeue()
- Acquires job lock before processing
- Creates fresh DB session per job (SessionLocal()) — prevents connection leaks
- **Never self-shuts down** — polls forever, Fly.io manages VM lifecycle
- SIGTERM/SIGINT handlers for graceful shutdown
### 4. Autoscaling (core/sync_job_queue.py + core/startup_tasks.py)
**Scale-up** (on start_historical_sync):
- ensure_worker_running() checks if workers exist, starts stopped ones, or creates new
- If queue_depth > 5 and < 3 workers running → scale_up() → Fly API creates new 2GB worker VM
**Scale-down** (every 5 minutes via maintenance loop):
- autoscale_workers() checks idle time
- If idle >= 5 minutes and > 1 workers → scale_down() → destroys newest worker
- **Always keeps at least 1 worker**
**Max capacity:** 3 concurrent workers, each processing 100 records/chunk
### 5. Reaper (core/startup_tasks.py)
Runs every 5 minutes on the web machine:
- Finds jobs with status = "running" and last_heartbeat older than 15 minutes
- Marks them cancelled with "Abandoned (server restart or timeout)"
- Jobs set last_heartbeat before entering the processing loop (initial) and after each chunk
### 6. Integration Fetch (integrations/outlook_service.py)
- **Token refresh** via Microsoft OAuth endpoint (30s timeout)
- **Page fetch** via Microsoft Graph API with:
- 30s aiohttp timeout
- Retry on 504 Gateway Timeout (1 retry, 2s backoff)
- Retry on asyncio.TimeoutError (1 retry, 2s backoff)
- **Compound page tokens** ("channel||nextLink") for email + calendar pagination
- **Lazy token refresh** — checks expiry before each page fetch
## Timeouts and Resilience
| Operation | Timeout | Retry | Fallback |
|-----------|---------|-------|----------|
| MS Graph API page fetch | 30s | 1 (504/timeout) | Returns error to job |
| MS OAuth token refresh | 30s | None | Returns None → auth failure |
| Fly Machines API calls | 10s | None | Falls back to in-process |
| Reaper heartbeat check | 15 min | N/A | Cancels abandoned jobs |
| DB session pool timeout | 30s | N/A | Raises TimeoutError |
## Key Configuration
# fly.toml
[[vm]]
memory = "2gb"
cpus = 1
memory_mb = 2048
processes = ["worker"]
[http_service]
auto_stop_machines = true
auto_start_machines = true
# sync_job_queue.py
SCALE_UP_QUEUE_DEPTH = 5 # Scale up if > 5 jobs waiting
SCALE_DOWN_IDLE_MINUTES = 5 # Scale down if idle 5+ min
LOCK_TIMEOUT = 300 # 5 min job lock TTL
# sync_worker.py
POLL_INTERVAL = 1 # Redis poll every 1 second
## Database Schema
historical_sync_jobs table:
- id (UUID) — primary key
- tenant_id (VARCHAR) — tenant isolation
- integration_id (VARCHAR) — e.g., "outlook", "slack"
- source_connection_id (VARCHAR) — UserConnection ID
- status — pending / running / completed / failed / paused / cancelled
- progress_percentage, records_processed
- entities_extracted, relationships_extracted
- last_heartbeat — updated before loop and after each chunk
- checkpoint_data (JSONB) — for resumability
- last_error, error_count, max_retries
## Common Failure Modes and Fixes
| Symptom | Likely Cause | Fix |
|---------|-------------|-----|
| Job stuck pending | Worker VM stopped, not restarted | ensure_worker_running() wakes it |
| Job running with 0/0 for minutes | Token refresh or Graph API hanging | 30s timeout + 504 retry |
| Job cancelled "Abandoned" | Reaper killed it — no heartbeat | Initial heartbeat before loop |
| Job paused | Memory threshold on web machine | Memory check only on worker VMs |
| Job failed with auth error | Expired refresh token | Terminal failure, UI shows Reconnect |
| Worker self-stops mid-job | Old idle-shutdown bug | Removed self-shutdown entirely |